package com.atguigu.gmall.realtime.app.func;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

public class MyStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //定义一个json对象，封装全部，database,table type,data
        JSONObject resultJson=new JSONObject();

        //获取数据库和表
        String topic = sourceRecord.topic();
        String[] topicArr= topic.split("\\.");//对.进行转义
        String dbName = topicArr[1];
        String tableName = topicArr[2];

        //2.获取当前对象的操作类型(c,r,u,d)
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        //System.out.println(operation.toString()); //CREATE,DELETE

        //3.获取数据
        Struct valueStruct = (Struct) sourceRecord.value();
        //System.out.println(valueStruct);
        Struct afterStruct = valueStruct.getStruct("after");
        //TODO 需要将data数据封装成一个json对象
        JSONObject dataJsonObject=new JSONObject();
        if(afterStruct != null){
            //拿到属性
            for(Field field: afterStruct.schema().fields()){
                //属性名
                String filedName = field.name();
                //属性值
                Object filedValue = afterStruct.get(filedName);
                dataJsonObject.put(filedName,filedValue);
            }
        }

        resultJson.put("databases",dbName);
        resultJson.put("table",tableName);
        resultJson.put("type",type);
        resultJson.put("data",dataJsonObject);
        //将反序列化结果向外输出
        collector.collect(resultJson.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}
